# ---
# title: AI-Powered Data Analyst
# description: Build a resilient AI data analyst using Prefect and `pydantic-ai` to analyze datasets, detect anomalies, and generate insights.
# icon: robot
# dependencies: ["prefect", "pydantic-ai[prefect]", "pandas"]
# keywords: ["ai", "agents", "pydantic-ai", "data-analysis", "llm", "durable-execution"]
# order: 6
# ---
#
# This example shows how to build resilient AI workflows using Prefect and `pydantic-ai`.
# The integration provides automatic retries for LLM calls, full observability of agent decisions,
# and durable execution semantics that make workflows idempotent and rerunnable.
#
# ## The Scenario: AI Data Analyst
#
# You need to analyze datasets programmatically, but writing custom analysis code for each dataset is time-consuming.
# Instead, you'll build an AI agent that:
#
# 1. Understands your dataset structure
# 2. Decides which analyses are most valuable
# 3. Uses Python tools to calculate statistics and detect anomalies
# 4. Generates actionable insights
#
# All while being resilient to LLM failures, tool errors, and network issues.
#
# This example demonstrates:
# * [`PrefectAgent`](https://ai.pydantic.dev/durable_execution/prefect/) – Wraps `pydantic-ai` agents for durable execution
# * **Agent Tools** – Python functions the AI can call, automatically wrapped as Prefect tasks
# * [`TaskConfig`](https://ai.pydantic.dev/durable_execution/prefect/#task-configuration) – Custom retry policies and timeouts for AI operations
# * [**Durable Execution**](https://ai.pydantic.dev/durable_execution/prefect/#durable-execution) – Automatic idempotency and failure recovery
#
# ## Setup
#
# Install dependencies (if not already installed):
# ```bash
# uv add pydantic-ai[prefect] pandas
# # or with pip:
# pip install "pydantic-ai[prefect]" pandas
# ```

from __future__ import annotations

from typing import Any

import pandas as pd
from pydantic import BaseModel, Field
from pydantic_ai import Agent, RunContext
from pydantic_ai.durable_exec.prefect import PrefectAgent, TaskConfig

from prefect import flow, task

# ## Agent Tools
#
# These functions are "tools" that the AI agent can call to analyze data.
# Prefect automatically wraps each tool execution as a task for observability and retries.


def calculate_statistics(ctx: RunContext[pd.DataFrame], column: str) -> dict[str, Any]:
    """Calculate descriptive statistics for a column.

    The AI agent can call this tool to understand data distribution,
    and Prefect ensures it retries on failure."""
    df = ctx.deps
    if column not in df.columns:
        return {"error": f"Column '{column}' not found. Available: {list(df.columns)}"}

    stats = df[column].describe().to_dict()
    stats["missing_count"] = int(df[column].isna().sum())
    stats["unique_count"] = int(df[column].nunique())
    return {
        k: (float(v) if isinstance(v, (int, float)) else v) for k, v in stats.items()
    }


def detect_anomalies(
    ctx: RunContext[pd.DataFrame], column: str, threshold: float = 3.0
) -> list[dict[str, Any]]:
    """Detect anomalies using standard deviation method.

    Identifies values that are more than `threshold` standard deviations from the mean.
    This tool demonstrates how complex analysis logic can be made reliable with Prefect."""
    df = ctx.deps
    if column not in df.columns:
        return [{"error": f"Column '{column}' not found"}]

    if not pd.api.types.is_numeric_dtype(df[column]):
        return [{"error": f"Column '{column}' is not numeric"}]

    mean = df[column].mean()
    std = df[column].std()

    if std == 0:
        return []

    anomalies = df[abs(df[column] - mean) > (threshold * std)]
    return [
        {
            "index": int(idx),
            "value": float(row[column]),
            "z_score": float((row[column] - mean) / std),
        }
        for idx, row in anomalies.head(10).iterrows()
    ]


def get_column_info(ctx: RunContext[pd.DataFrame]) -> dict[str, Any]:
    """Get overview of all columns in the dataset.

    Helps the AI agent understand the dataset structure before analysis."""
    df = ctx.deps
    return {
        "columns": list(df.columns),
        "shape": {"rows": len(df), "columns": len(df.columns)},
        "dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()},
    }


# ## Analysis Results Model
#
# Structured output ensures the AI returns consistent, parseable results.


class DataAnalysis(BaseModel):
    """Structured analysis results from the AI agent."""

    summary: str = Field(description="High-level summary of the dataset")
    key_findings: list[str] = Field(
        description="Key findings discovered from the data", min_length=3, max_length=5
    )
    recommendations: list[str] = Field(
        description="Actionable recommendations based on the findings",
        min_length=3,
        max_length=5,
    )
    columns_analyzed: list[str] = Field(
        description="List of columns that were analyzed"
    )

    def __str__(self) -> str:
        """Format the analysis results for clean display."""
        findings = "\n".join(
            f"  {i}. {finding}" for i, finding in enumerate(self.key_findings, 1)
        )
        recommendations = "\n".join(
            f"  {i}. {rec}" for i, rec in enumerate(self.recommendations, 1)
        )

        return f"""
{"=" * 80}
ANALYSIS RESULTS
{"=" * 80}

📋 Summary:
{self.summary}

🔑 Key Findings:
{findings}

💡 Recommendations:
{recommendations}

📊 Columns Analyzed: {", ".join(self.columns_analyzed)}
{"=" * 80}
"""


# ## Creating the AI Agent
#
# We configure the agent with tools and wrap it with PrefectAgent for durability.


def create_data_analyst_agent() -> PrefectAgent[pd.DataFrame, DataAnalysis]:
    """Create an AI data analyst with Prefect durability.

    The PrefectAgent wrapper automatically:
    - Wraps agent.run as a Prefect flow
    - Wraps LLM calls as Prefect tasks with retries
    - Wraps tool calls as separate Prefect tasks
    """

    # Create the base pydantic-ai agent
    agent = Agent(
        "openai:gpt-4o",
        name="data-analyst-agent",
        output_type=DataAnalysis,
        deps_type=pd.DataFrame,
        # Register tools that the agent can use
        tools=[calculate_statistics, detect_anomalies, get_column_info],
        system_prompt=(
            "You are an expert data analyst. Analyze the provided dataset using "
            "the available tools. Focus on finding meaningful patterns, anomalies, "
            "and actionable insights. Always start by understanding the dataset "
            "structure with get_column_info."
        ),
    )

    # Wrap with PrefectAgent for durable execution with custom retry policy
    return PrefectAgent(
        agent,
        model_task_config=TaskConfig(
            retries=3,  # Retry LLM calls up to 3 times
            retry_delay_seconds=[1.0, 2.0, 4.0],  # Exponential backoff
            timeout_seconds=60.0,  # 60s timeout for LLM calls
        ),
        tool_task_config=TaskConfig(
            retries=2,  # Retry tool calls up to 2 times
            retry_delay_seconds=[0.5, 1.0],
        ),
    )


# ## Sample Dataset Generator
#
# Create a realistic sales dataset for demonstration.


@task
def create_sample_dataset() -> pd.DataFrame:
    """Generate a sample sales dataset with some anomalies.

    In production, you'd load real data from a file, database, or API."""
    data = {
        "product": ["Widget", "Gadget", "Doohickey", "Widget", "Gadget"] * 20,
        "sales": [100, 150, 200, 110, 145] * 19
        + [100, 150, 200, 1000, 2000],  # Last 2 are anomalies
        "region": ["North", "South", "East", "West", "Central"] * 20,
        "month": [1, 2, 3, 4, 5] * 20,
    }
    return pd.DataFrame(data)


# ## Main Analysis Flow
#
# Orchestrate the entire AI analysis workflow with Prefect.


@flow(name="ai-data-analyst", log_prints=True)
async def analyze_dataset_with_ai() -> DataAnalysis:
    """Run AI-powered data analysis with automatic retries and observability.

    This flow demonstrates how Prefect makes AI workflows production-ready:
    1. Dataset preparation is tracked as a task
    2. AI agent execution is wrapped for durability
    3. All LLM and tool calls are logged and retryable
    4. Results are structured and validated with Pydantic
    """

    # Prepare the dataset
    print("📊 Preparing dataset...")
    df = create_sample_dataset()
    print(f"Dataset shape: {df.shape}\n")

    # Create the AI agent with Prefect durability
    print("🤖 Initializing AI data analyst...")
    agent = create_data_analyst_agent()

    # Run the analysis - all LLM and tool calls are automatically retried on failure
    print("🔍 Running AI analysis...\n")
    result = await agent.run(
        "Analyze this sales dataset. Identify patterns, anomalies, and provide recommendations.",
        deps=df,
    )

    # Display results
    print(result.output)

    return result.output


# ## Serve the Flow
#
# To get full durable execution with automatic idempotency, serve the flow to create a deployment.
# Deployed flows enable Prefect's transactional semantics for agent operations.

if __name__ == "__main__":
    import os
    import sys

    # Check if OpenAI API key is set
    if not os.getenv("OPENAI_API_KEY"):
        print("❌ Error: OPENAI_API_KEY environment variable not set")
        print("Set it with: export OPENAI_API_KEY='your-key-here'")
        sys.exit(1)

    # Serve the flow - this creates a deployment and runs a worker process
    analyze_dataset_with_ai.serve(
        name="ai-data-analyst-deployment",
        tags=["ai", "pydantic-ai", "data-analysis"],
    )

# ## Triggering Flow Runs
#
# Once served, trigger runs via:
#
# **Prefect UI:**
# 1. Navigate to http://localhost:4200
# 2. Go to Deployments → "ai-data-analyst-deployment"
# 3. Click "Run" → "Quick Run"
#
# **CLI:**
# ```bash
# prefect deployment run ai-data-analyst/ai-data-analyst-deployment --watch
# ```
#
# ## Local Testing
#
# For quick local testing without deployment:
# ```python
# import asyncio
# asyncio.run(analyze_dataset_with_ai())
# ```

# ## What Just Happened?
#
# When you serve and trigger this flow, Prefect and `pydantic-ai` work together to create a resilient AI pipeline:
#
# 1. **Deployment Creation** – `serve()` creates a deployment and starts a worker to execute flow runs
# 2. **Durable AI Execution** – The `PrefectAgent` wrapper makes all AI operations retryable:
#    - LLM calls retry up to 3 times with exponential backoff (1s, 2s, 4s)
#    - Tool calls retry up to 2 times
#    - All operations respect 60s timeout
# 3. **Tool Observability** – Each time the AI calls a tool (`get_column_info`, `calculate_statistics`, `detect_anomalies`),
#    the call is run as a Prefect task
# 4. **Structured Results** – Pydantic validates the AI's output, ensuring it matches the expected schema
# 5. **Automatic Idempotency** – When a deployed flow run is retried, Prefect's transactional semantics ensure that
#    completed tasks are skipped and only failed operations are re-executed. This prevents duplicate API calls and
#    wasted compute.
#
# ## Key Takeaways
#
# * **Deploy for Durability** – Use `flow.serve()` or `flow.deploy()` to unlock automatic idempotency and transactional semantics
# * **Retry Intelligence** – Failed flow runs can be retried from the UI, skipping already-completed tasks
# * **Tool Observability** – Every AI decision and tool call is tracked, logged, and independently retryable
# * **Zero Boilerplate** – Just wrap your pydantic-ai agent with `PrefectAgent`
# * **Customizable Policies** – Fine-tune retries, timeouts, and error handling per operation type
#
# **Try it yourself:**
# 1. Set your OpenAI API key: `export OPENAI_API_KEY='your-key'`
# 2. Start the Prefect server: `prefect server start`
# 3. Serve the flow: `uv run -s examples/ai_data_analyst_with_pydantic_ai.py`
# 4. Trigger a run from the UI (http://localhost:4200) or CLI
# 5. Watch all AI operations tracked in real-time
#
# For more on AI orchestration with Prefect:
# - [pydantic-ai + Prefect documentation](https://ai.pydantic.dev/durable_execution/prefect/)
# - [Task configuration and retries](/v3/how-to-guides/workflows/write-and-run#task-configuration)
# - [Workflow deployments](/v3/how-to-guides/deployments/create-deployments)
